package org.codehaus.activemq.ra;

import java.util.ArrayList;
import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTopic;

public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
	private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
	private static final int MAX_MSGS_PER_SESSION = 1;
	private static final int MAX_SESSION = 10;
	ConnectionConsumer consumer;
	private ServerSessionPoolImpl serverSessionPool;

	public ActiveMQAsfEndpointWorker(ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key)
			throws ResourceException {
		super(adapter, key);
	}

	public void start() throws WorkException, ResourceException {
		log.debug("Starting");
		boolean ok = false;
		try {
			this.serverSessionPool = new ServerSessionPoolImpl();
			ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();

			Destination dest = null;

			if ("javax.jms.Queue".equals(activationSpec.getDestinationType()))
				dest = new ActiveMQQueue(activationSpec.getDestinationName());
			else if ("javax.jms.Topic".equals(activationSpec.getDestinationType()))
				dest = new ActiveMQTopic(activationSpec.getDestinationName());
			else {
				throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
			}

			if (emptyToNull(activationSpec.getDurableSubscriptionName()) != null)
				this.consumer = this.adapter.getPhysicalConnection().createDurableConnectionConsumer((Topic) dest,
						activationSpec.getDurableSubscriptionName(), emptyToNull(activationSpec.getMessageSelector()),
						this.serverSessionPool, 1);
			else {
				this.consumer = this.adapter.getPhysicalConnection().createConnectionConsumer(dest,
						emptyToNull(activationSpec.getMessageSelector()), this.serverSessionPool, 1);
			}

			ok = true;
			log.debug("Started");
		} catch (JMSException e) {
			throw new ResourceException("Could not start the endpoint.", e);
		} finally {
			if (!ok)
				safeClose(this.consumer);
		}
	}

	public void stop() throws InterruptedException {
		safeClose(this.consumer);
		this.serverSessionPool.close();
	}

	private String emptyToNull(String value) {
		if ("".equals(value)) {
			return null;
		}
		return value;
	}

	class ServerSessionImpl implements ServerSession, Work, MessageListener {
		Session session;
		private final ActiveMQAsfEndpointWorker.ServerSessionPoolImpl pool;
		private Object runControlMutex = new Object();
		boolean workPendingFlag = false;
		boolean runningFlag = false;
		int runCounter = 0;
		XAResource xaResource;

		public ServerSessionImpl(ActiveMQAsfEndpointWorker.ServerSessionPoolImpl pool, Session session)
				throws JMSException {
			this.pool = pool;
			this.session = session;
			this.session.setMessageListener(this);
			if ((session instanceof XASession))
				this.xaResource = ((XASession) session).getXAResource();
		}

		public Session getSession() throws JMSException {
			return this.session;
		}

	public void start()
      throws JMSException
    {
      ActiveMQAsfEndpointWorker.log.debug("ServerSession started.");
      synchronized (this.runControlMutex) {
        this.runCounter += 1;

        if ((this.runningFlag) || (this.workPendingFlag))
        {
          this.workPendingFlag = true;
          ActiveMQAsfEndpointWorker.log.debug("ServerSession allready running.");
          return;
        }
        this.workPendingFlag = true;
      }

      ActiveMQAsfEndpointWorker.log.debug("ServerSession queuing request for a run.");
      try {
        ActiveMQAsfEndpointWorker.this.workManager.scheduleWork(this, 9223372036854775807L, null, new ActiveMQAsfEndpointWorker1(this));
      }
      catch (Exception e)
      {
        throw ((JMSException)new JMSException("Work could not be started: " + e).initCause(e));
      }
    }

		public void run() {
			while (true) {
				synchronized (this.runControlMutex) {
					this.workPendingFlag = false;
					this.runningFlag = true;
				}

				ActiveMQAsfEndpointWorker.log.debug("Running: " + this);
				this.session.run();

				synchronized (this.runControlMutex) {
					this.runCounter -= 1;
					this.runningFlag = false;
					if (!this.workPendingFlag)
						if (this.runCounter == 0)
							this.pool.returnToPool(this);
				}
			}
		}

		public void release() {
			ActiveMQAsfEndpointWorker.log.debug("release called");
		}

		public void onMessage(Message message) {
			try {
				MessageEndpoint endpoint = ActiveMQAsfEndpointWorker.this.endpointFactory
						.createEndpoint(this.xaResource);
				MessageListener listener = (MessageListener) endpoint;

				endpoint.beforeDelivery(ActiveMQBaseEndpointWorker.ON_MESSAGE_METHOD);
				try {
					listener.onMessage(message);
				} finally {
					endpoint.afterDelivery();
				}
			} catch (NoSuchMethodException e) {
				ActiveMQAsfEndpointWorker.log.info(e);
			} catch (ResourceException e) {
				ActiveMQAsfEndpointWorker.log.info(e);
			}
		}

		public String toString() {
			return "ServerSessionImpl[session=" + this.session + "]";
		}
	}

	class ServerSessionPoolImpl implements ServerSessionPool {
		ActiveMQAsfEndpointWorker.ServerSessionImpl ss;
		ArrayList idleSessions = new ArrayList();
		LinkedList activeSessions = new LinkedList();
		int sessionIds = 0;
		int nextUsedSession;
		boolean closing = false;

		public ServerSessionPoolImpl() {
		}

		public ActiveMQAsfEndpointWorker.ServerSessionImpl createServerSessionImpl() throws JMSException {
			Session session = ActiveMQAsfEndpointWorker.this.adapter.getPhysicalConnection().createSession(true, 0);
			return new ActiveMQAsfEndpointWorker.ServerSessionImpl(this, session);
			// return new
			// ActiveMQAsfEndpointWorker.ServerSessionImpl(ActiveMQAsfEndpointWorker.this,
			// this, session);
		}

		public synchronized ServerSession getServerSession() throws JMSException {
			ActiveMQAsfEndpointWorker.log.debug("ServerSession requested.");
			if (this.closing) {
				throw new JMSException("Session Pool Shutting Down.");
			}
			if (this.idleSessions.size() > 0) {
				ActiveMQAsfEndpointWorker.ServerSessionImpl ss = (ActiveMQAsfEndpointWorker.ServerSessionImpl) this.idleSessions
						.remove(this.idleSessions.size() - 1);
				this.activeSessions.addLast(ss);
				ActiveMQAsfEndpointWorker.log.debug("Using idle session: " + ss);
				return ss;
			}

			if (this.activeSessions.size() >= 10) {
				ActiveMQAsfEndpointWorker.ServerSessionImpl ss = (ActiveMQAsfEndpointWorker.ServerSessionImpl) this.activeSessions
						.removeFirst();
				this.activeSessions.addLast(ss);
				ActiveMQAsfEndpointWorker.log.debug("Reusing an active session: " + ss);
				return ss;
			}
			ActiveMQAsfEndpointWorker.ServerSessionImpl ss = createServerSessionImpl();
			this.activeSessions.addLast(ss);
			ActiveMQAsfEndpointWorker.log.debug("Created a new session: " + ss);
			return ss;
		}

		public synchronized void returnToPool(ActiveMQAsfEndpointWorker.ServerSessionImpl ss) {
			ActiveMQAsfEndpointWorker.log.debug("Session returned to pool: " + ss);
			this.idleSessions.add(ss);
		}

		public void close() {
			synchronized (this) {
				this.closing = true;
			}
		}
	}
}